# Future & FutureTask 详解

# FutureTask 的作用

Future 接口代表一个异步计算操作的结果,并且提供了各种方法用来控制这个计算操作,比如等待完成、获取结果、取消计算、计算状态。区别于阻塞等待,将计算过程交给子线程之后,主线程可以执行其他操作,等待异步操作完成后再获取结果,这样可以提高整个程序的运行效率。

而 FutureTask 是 Future 的接口实现,提供了上述各个操作的实现之外,本身也是一个任务,实现了 Runnable 接口,可以作为任务传递给线程。

FutureTask 通过为任务设置状态变化和封装等待线程链表的方式,来保证任务执行、结果获取的逻辑性,以及阻塞等待获取结果的能力。

# FutureTask 的使用

常见的使用方式是将 Callable 通过 ExecutorService.submit() 方法提交给线程池,这样线程池会返回一个 Future 类,底层实现就是 FutureTask。

示例一:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author linjinjia
 * @date 2023/7/12 12:07
 */
public class FutureTaskExample {

    /**
     * 求和任务
     */
    private static class SumTask implements Callable<Integer> {
        /**
         * 起始数
         */
        private final int start;
        /**
         * 终止数
         */
        private final int end;

        public SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        public Integer call() throws Exception {
            System.out.printf("%s: 计算区间:[%d, %d)\n", Thread.currentThread().getName(), start, end);
            int sum = 0;
            for (int i = start; i < end; i++) {
                sum += i;
            }
            // 睡眠 1 秒模拟计算耗时
            TimeUnit.SECONDS.sleep(1);
            return sum;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int n = 100, step = 20;
				// 线程池最多有 3 条线程
        ExecutorService executor = Executors.newFixedThreadPool(3);
        List<Future<Integer>> futures = new ArrayList<>();
        for (int i = 1; i <= n; i += step) {
            // 添加任务
            Future<Integer> future = executor.submit(new SumTask(i, i + step));
            futures.add(future);
        }

        int sum = 0;
        for (Future<Integer> future : futures) {
            // 等待计算结果
            sum += future.get();
        }
        System.out.println(sum);

        // 关闭线程池,回收资源
        executor.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

示例二:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

/**
 * @author linjinjia
 * @date 2023/7/12 12:07
 */
public class FutureTaskExample2 {

    private static class SumTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            // 睡眠 1 秒模拟计算耗时
            TimeUnit.SECONDS.sleep(2);
            return 1;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 线程池只有一条线程
        ExecutorService executor = Executors.newFixedThreadPool(1);

        FutureTask<Integer> task1 = new FutureTask<>(new SumTask());
        FutureTask<Integer> task2 = new FutureTask<>(new SumTask());

        executor.submit(task1);
        executor.submit(task2);

        System.out.println("task2 已完成:" + task2.isDone());
        System.out.println("task1 结果: " + task1.get());
        System.out.println("task2 已完成:" + task2.isDone());
        System.out.println("task2 结果: " + task2.get());
        System.out.println("task2 已完成:" + task2.isDone());

        executor.shutdown();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

# Future 和 FutrueTask 的关系

FutureTask

FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture 是继承了 Runnable 和 Future 接口,因此 FutureTask 具备了 Runnbale 的作用和 Future 的能力。

除此之外,FutureTask 还依赖 Callable 作为其内部成员变量。

下面介绍 Future 定义的方法。

public interface Future<V> {

    /**
     * 尝试取消任务,如果任务已经完成、被取消过,则取消失败,或者其他原因导致取消失败。
     * 执行过这个方法之后,接下来调用 isDone 方法永远会返回 true,
     * 如果 cancel 方法返回 true,则 isCancelled 也会返回 true
     *
     * @param mayInterruptIfRunning 如果为 true,则执行任务的线程会被强行中断,否则的会等待任务完成
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 正常完成任务的情况下,该方法会返回 true
     */
    boolean isCancelled();

    /**
     * 返回任务是否已经完成
     * 完成的情况包括:正常结束、异常结束、被取消
     */
    boolean isDone();

    /**
     * 获取结果,如果计算还未结束,
     * @throws CancellationException 如果计算被取消
     * @throws ExecutionException 如果计算过程抛出了一场
     * @throws InterruptedException 如果负责计算的线程在等待时中断
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 超时等待获取结果
     *
     * @param 超时时间
     * @param 时间单位
     * @throws CancellationException 如果计算被取消
     * @throws ExecutionException 如果计算过程抛出了一场
     * @throws InterruptedException 如果负责计算的线程在等待时中断
     * @throws TimeoutException 如果等待超时
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# FutureTask 的状态

为了方便管理 FutureTask,作者为 FutureTask 定义了 77 种状态。

// 表示当前任务的状态,是一个 volatile 变量,这样状态的变化对其他线程可见
private volatile int state;
// 初始状态,表示任务刚创建或者还未完成
private static final int NEW          = 0;
// 中间状态,表示任务已经完成或者抛出了一场,但是任务的结果(异常)还未保存
private static final int COMPLETING   = 1;
// 最终状态,表示任务已经完成并且结果已经保存
private static final int NORMAL       = 2;
// 最终状态,表示任务执行过程中出现了异常,并且异常已经被保存
private static final int EXCEPTIONAL  = 3;
// 最终状态,表示任务在未开始或者开始未完成的情况下,用户调用了 cancel(false) 导致任务取消
private static final int CANCELLED    = 4;
// 中间状态,表示任务在未开始或者开始未完成的情况下,
// 用户调用了 cancel(true) 导致任务被取消且正在中断执行线程(还未成功中断)
private static final int INTERRUPTING = 5;
// 最终状态,表示任务执行线程被中断
private static final int INTERRUPTED  = 6;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

对于大于等于 COMPLETING (即 state != NEW)的状态,都是属于已完成状态,isDone() 方法会返回 true。

FutureTask-state

# FutureTask 源码解析

# 核心属性

/**
 * 任务的状态
 */
private volatile int state;

/**
 * 内置的 Callable 任务,任务结束后置空
 */
private Callable<V> callable;

/**
 * 保存从 get() 方法返回的结果或者异常
 */
private Object outcome; // non-volatile, protected by state reads/writes

/**
 * callable 任务的执行线程,在 run() 方法中使用 CAS 进行设置
 */
private volatile Thread runner;

/**
 * 使用 Treiber 栈保存等待线程
 * Treiber 栈是一种无锁并发栈,其无锁的特性是基于CAS原子操作实现的
 */
private volatile WaitNode waiters;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 构造函数

FutureTask 有两个构造函数,分别用来接收 Callable 和 Runnable 对象。

/**
 * 创建 FutureTask 对象,并执行给定的 Callable 对象
 */
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    // 初始状态
    this.state = NEW;       // ensure visibility of callable
}

/**
 * 创建 FutureTask 对象,并执行给定的 Runnable 对象,
 * 同时还接收一个 result 作为任务成功执行之后的返回结果。
 * 如果不需要返回结果,可以考虑使用 
 * Future<?> f = new FutureTask<Void>(runnable, null) 的方式
 */
public FutureTask(Runnable runnable, V result) {
  this.callable = Executors.callable(runnable, result);
  // 初始状态
  this.state = NEW;       // ensure visibility of callable
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

在第二个构造函数中,对于传入的 Runnable 对象,使用 Executors.callable() 方法成 Callable 对象。该方法时 Executors 中的一个方法,其实现逻辑使用一个 Runnable 适配器,将 Runnable 对象包装成 Callable 对象。

// java.util.concurrent.Executors

public static <T> Callable<T> callable(Runnable task, T result) {
  if (task == null)
    throw new NullPointerException();
  return new RunnableAdapter<T>(task, result);
}

/**
 * Runnable 适配器实现了 Callable 接口
 */
static final class RunnableAdapter<T> implements Callable<T> {
  final Runnable task;
  final T result;
  RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
  }
  public T call() {
    task.run();
    return result;
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 内部类 WaitNode

/**
 * 简单的链表节点
 * 以 Treiber 栈的形式记录等待获取结果的线程
 * 构造函数会记录当前线程
 */
static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}
1
2
3
4
5
6
7
8
9
10

# 任务的执行 run()

/**
 * 执行任务
 * 
 * 1. 先判断任务状态是否是初始状态 NEW,然后将当前线程设置任务的执行线程;
 * 2. 调用内置的 callable 对象的 run 方法,然后保存执行结果或者异常、更新状态、唤醒等待线程
 * 3. 如果任务被中断,会调用 `handlePossibleCancellationInterrupt` 方法来
 *    保证保证方法退出前,state 进入 INTERRUPTED 状态
 */
public void run() {
    // 确认任务处于初始状态,以及通过 CAS 设置当前线程为执行线程
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 调用 call() 方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 保存异常,更新任务状态和唤醒等待线程
                setException(ex);
            }
            if (ran)
                // 保存结果,更新任务状态和唤醒等待线程
                set(result);
        }
    } finally {
        // 将 runner 设置为 null,防止并发调用 run() 方法
        runner = null;
        int s = state;
        // 如果正在中断中或者已中断,说明调用了 cancel(true) 方法
        // 因此这里需要保证 cancel 方法中把 state 设为 INTERRUPTED
        if (s >= INTERRUPTING)
            // 保证方法退出前,state 进入 INTERRUPTED 状态
            handlePossibleCancellationInterrupt(s);
    }
}

/**
 * 将任务状态更新为 COMPLTING,然后保存结果,再更新为 NORMAL
 * 最终唤醒所有的等待线程
 */
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

/**
 * 将任务状态更新为 COMPLTING,然后保存异常,再更新为 EXCEPTIONAL
 * 最终唤醒所有的等待线程
 */
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
      while (state == INTERRUPTING)
        Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

# 任务的取消 cancel()

/**
 * 尝试取消任务,如果任务已经完成、被取消过,则取消失败,或者其他原因导致取消失败。
 * 执行过这个方法之后,接下来调用 isDone 方法永远会返回 true,
 * 如果 cancel 方法返回 true,则 isCancelled 也会返回 true
 *
 * @param mayInterruptIfRunning 如果为 true,则执行任务的线程会被强行中断,否则的会等待任务完成
 */
public boolean cancel(boolean mayInterruptIfRunning) {
    // 根据 mayInterruptIfRunning 选择进入中断中还是取消
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            // 中断线程
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                // 将状态更新为已中断
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 唤醒等待线程
        finishCompletion();
    }
    return true;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 等待线程的唤醒 finishCompletion()

/**
 * 唤醒所有的等待线程,
 * 然后调用 done() 方法,并且将 callable 设置为 null
 */
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // 先将 waiters 置为 null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 然后在循环里,从头到尾唤醒节点中封装的线程
            for (;;) {
                // 将节点的 thread 置为 null, 并唤醒线程
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                // 遍历下一个节点
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 结果的获取 get()

get() 方法在 state 为未完成或即将完成的状态下,通过 awaitDone() 方法实现阻塞等待的,并在 report() 方法中根据状态 state 返回结果或者抛出异常。

/**
 * 阻塞等待
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

/**
 * 超时等待
 */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        // 判断当前状态如果是 NEW 或者 COMPLETING,那么就是超时了
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) 
        throw new TimeoutException();
    return report(s);
}

/**
 * 根据不同的状态返回结果或者抛出异常
 */
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    // 结果
    Object x = outcome;
    if (s == NORMAL)
      // 正常完成
      return (V)x;
    if (s >= CANCELLED)
      // 抛出取消异常,表示处理过程中,任务被取消了
      throw new CancellationException();
    // 抛出执行过程中的异常
    throw new ExecutionException((Throwable)x);
}

/**
 * 将当前线程放入等待列表,根据超时参数进行阻塞
 * 
 * 返回任务的状态 state
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 无限循环,循环体中有很多 if-else 分支,
    // 每次循环会根据条件进入某一个分支 
    for (;;) {
        // 如果当前线程被中断
        // 则不进入等待列表,或者从等待列表中移除
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
				
        int s = state;
        // s > COMPLETING 意味任务已经完成、被取消或者中断
        // 不需要进入等待队列
        if (s > COMPLETING) {  // 分支①
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // 能到达这里,说明 s 为 NEW 或者 COMPLETING
            // 如果任务即将完成,则线程暂时释放 CPU 时间片资源进行等待
            // 下一循环可能会进入分支①
            Thread.yield();
        else if (q == null) // 能到达这里,说明 s 为 NEW
            // 将当前线程封装成等待节点
            q = new WaitNode();
        else if (!queued) // s 为 NEW,且 q 还未入队(可能没入过队,也可能上次入队失败)
            // CAS 的方式将 q 插入到当前 waiters 前面,然后 waiters 指向 q
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
        else if (timed) {  // s 为 NEW,且 q 已经入队,当前线程可以阻塞等待唤醒
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) { // 等待已经超时,将 q 从等待队列移除
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this); // 无限等待
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

# 任务的重启 runAndReset()

runAndReset()run() 方法最大的区别是 runAndReset() 不需要设置返回值,并且在执行过程中如果没有抛异常或者被执行线程被中断,是不会改变任务状态的。它用于执行需要多次执行的任务上。

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 参考文章

  • https://pdai.tech/md/java/thread/java-thread-x-juc-executor-FutureTask.html
  • https://www.cnblogs.com/linghu-java/p/8991824.html以及https://www.jianshu.com/p/d61d7ffa6abc
上次更新: 2023/10/15